package com.xiaojiezhu.spark.setup3;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.PairFlatMapFunction;
import scala.Tuple2;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.function.BiConsumer;

public class JavaMapPartition {

    public static void main(String[] args) {
        SparkConf conf = new SparkConf().setMaster("local").setAppName("app");
        JavaSparkContext sc = new JavaSparkContext(conf);

        JavaRDD<String> rdd = sc.parallelize(Arrays.asList("hello world","hello jame","hello jie"));

        JavaPairRDD<String, Object> result = (JavaPairRDD<String, Object>) rdd.mapPartitionsToPair(new PairFlatMapFunction<Iterator<String>, String, Object>() {
            @Override
            public Iterator<Tuple2<String, Object>> call(Iterator<String> stringIterator) throws Exception {
                List<Tuple2<String,Object>> data = new ArrayList<>();
                while (stringIterator.hasNext()){
                    String line = stringIterator.next();
                    String[] split = line.split(" ");
                    Tuple2<String,Object> tmp = new Tuple2<>(split[0],split[1]);
                    data.add(tmp);
                }
                return data.iterator();
            }
        });

        result.collectAsMap().forEach(new BiConsumer<String, Object>() {
            @Override
            public void accept(String s, Object o) {
                System.out.println(s + ":" + o);
            }
        });


    }
}
